This Lambda function automatically scrapes Common Vulnerabilities and Exposures (CVEs) from cve.org and Known Exploited Vulnerabilities (KEVs) from CISA, then loads them into an AWS Bedrock Knowledge Base for use with Retrieval Augmented Generation (RAG).
- CVE Scraping: Fetches CVEs from the National Vulnerability Database (NVD) API
- KEV Scraping: Retrieves Known Exploited Vulnerabilities from CISA's catalog
- Automated Loading: Uploads vulnerability data to S3 and syncs with Bedrock Knowledge Base
- Scheduled Execution: Runs on a configurable schedule (daily by default)
- Optimized for RAG: Formats data specifically for retrieval and question-answering tasks
- Comprehensive Metadata: Includes CVSS scores, affected products, remediation actions, and more
┌─────────────────┐
│ EventBridge │ (Scheduled trigger)
│ Schedule │
└────────┬────────┘
│
v
┌─────────────────┐
│ Lambda │
│ Function │
├─────────────────┤
│ • CVE Scraper │──┐
│ • KEV Scraper │ │
│ • Bedrock Loader│ │
└─────────────────┘ │
│ │
v v
┌─────────────────┐ ┌──────────────┐
│ S3 Bucket │ │ cve.org │
│ (Vulnerability │ │ CISA.gov │
│ Data) │ └──────────────┘
└────────┬────────┘
│
v
┌─────────────────┐
│ Bedrock │
│ Knowledge Base │
│ (RAG) │
└─────────────────┘
- AWS Account with appropriate permissions
- AWS CLI configured
- AWS SAM CLI installed (Installation Guide)
- Python 3.11 or later
- Bedrock Knowledge Base already created
- NVD API Key (optional, for higher rate limits) - Get one at NVD API
The project includes a complete GitLab CI/CD pipeline for automated deployments.
See GITLAB_CICD_SETUP.md for detailed setup instructions.
Quick start:
- Configure GitLab CI/CD variables (AWS credentials)
- Push to feature branch → auto-deploys to dev
- Merge to main → manual approval for production
Follow the steps below for manual deployment using AWS SAM CLI.
First, create a Bedrock Knowledge Base in the AWS Console:
- Go to Amazon Bedrock > Knowledge bases
- Click Create knowledge base
- Configure:
- Name:
vulnerability-rag-kb - IAM permissions: Create new role or use existing
- Choose embedding model (e.g.,
amazon.titan-embed-text-v1)
- Name:
- Add a data source:
- Type: S3
- S3 URI:
s3://your-bucket-name/vulnerability-data/ - Chunking strategy: Default or Fixed-size (recommended)
- Note the Knowledge Base ID and Data Source ID
# Clone the repository
git clone <repository-url>
cd hackathon2025
# Install dependencies locally for testing (optional)
pip install -r requirements.txt# Build the SAM application
sam build
# Deploy with guided prompts
sam deploy --guidedDuring deployment, provide:
- Stack Name: e.g.,
cve-kev-scraper - AWS Region: Your preferred region
- KnowledgeBaseId: From step 1
- DataSourceId: From step 1
- S3BucketName: Bucket for storing vulnerability data (will be created if not exists)
- S3Prefix: Prefix for organizing data (default:
vulnerability-data) - ScheduleExpression: How often to run (default:
rate(1 day)) - NVDApiKey: Your NVD API key (optional but recommended)
Example:
Parameter KnowledgeBaseId: KB123EXAMPLE
Parameter DataSourceId: DS456EXAMPLE
Parameter S3BucketName: my-vulnerability-data-bucket
Parameter S3Prefix: vulnerability-data
Parameter ScheduleExpression: rate(1 day)
Parameter NVDApiKey: ********
# Check Lambda function
aws lambda list-functions --query "Functions[?starts_with(FunctionName, 'cve-kev-scraper')].FunctionName"
# Check CloudWatch Events rule
aws events list-rules --name-prefix cve-kev-scraperInvoke the Lambda function manually:
# Scrape both CVEs and KEVs (default)
aws lambda invoke \
--function-name cve-kev-scraper-CVEKEVScraperFunction-XXXXX \
--payload '{}' \
response.json
# Scrape only CVEs
aws lambda invoke \
--function-name cve-kev-scraper-CVEKEVScraperFunction-XXXXX \
--payload '{"scrape_kevs": false}' \
response.json
# Scrape only KEVs
aws lambda invoke \
--function-name cve-kev-scraper-CVEKEVScraperFunction-XXXXX \
--payload '{"scrape_cves": false}' \
response.json
# Limit CVE results and specify date range
aws lambda invoke \
--function-name cve-kev-scraper-CVEKEVScraperFunction-XXXXX \
--payload '{"cve_limit": 100, "cve_start_date": "2024-01-01"}' \
response.json
# View response
cat response.json| Parameter | Type | Description | Default |
|---|---|---|---|
scrape_cves |
boolean | Whether to scrape CVEs | true |
scrape_kevs |
boolean | Whether to scrape KEVs | true |
cve_limit |
integer | Max CVEs to fetch | All available |
cve_start_date |
string | CVE start date (YYYY-MM-DD) | 30 days ago |
cve_end_date |
string | CVE end date (YYYY-MM-DD) | Today |
The function runs automatically based on the ScheduleExpression parameter:
- Daily:
rate(1 day) - Every 12 hours:
rate(12 hours) - Daily at 2 AM UTC:
cron(0 2 * * ? *) - Weekly on Monday:
cron(0 0 ? * MON *)
Update the schedule:
sam deploy --parameter-overrides ScheduleExpression="rate(12 hours)"View logs:
aws logs tail /aws/lambda/cve-kev-scraper-CVEKEVScraperFunction-XXXXX --followKey metrics to monitor:
- Invocations: Number of times function is invoked
- Errors: Number of errors
- Duration: Execution time
- Throttles: Rate limiting issues
The deployment includes an alarm for Lambda errors. Configure SNS notifications:
# Create SNS topic
aws sns create-topic --name cve-kev-scraper-alerts
# Subscribe email
aws sns subscribe \
--topic-arn arn:aws:sns:REGION:ACCOUNT:cve-kev-scraper-alerts \
--protocol email \
--notification-endpoint your-email@example.comOnce data is ingested, query the Knowledge Base using Bedrock:
import boto3
import json
bedrock_agent_runtime = boto3.client('bedrock-agent-runtime')
response = bedrock_agent_runtime.retrieve_and_generate(
input={
'text': 'What are the critical CVEs related to Apache Log4j?'
},
retrieveAndGenerateConfiguration={
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': 'KB123EXAMPLE',
'modelArn': 'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2'
}
}
)
print(response['output']['text'])aws bedrock-agent-runtime retrieve-and-generate \
--input '{"text": "What are the known exploited vulnerabilities for Microsoft products?"}' \
--retrieve-and-generate-configuration '{
"type": "KNOWLEDGE_BASE",
"knowledgeBaseConfiguration": {
"knowledgeBaseId": "KB123EXAMPLE",
"modelArn": "arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2"
}
}'{
"id": "CVE-2024-1234",
"type": "CVE",
"source": "NVD",
"text": "# CVE-2024-1234\n\n## Description\n...",
"metadata": {
"vulnerability_id": "CVE-2024-1234",
"vulnerability_type": "cve",
"source": "NVD",
"cvss_score": "9.8",
"cvss_severity": "CRITICAL"
}
}{
"id": "CVE-2024-5678",
"type": "KEV",
"source": "CISA",
"text": "# Vulnerability Name\n\n## Description\n...",
"metadata": {
"vulnerability_id": "CVE-2024-5678",
"vulnerability_type": "kev",
"vendor": "Microsoft",
"product": "Windows",
"ransomware_use": "Known"
}
}- Lambda: Free tier includes 1M requests/month and 400,000 GB-seconds
- S3: Storage costs for vulnerability data (typically < 1 GB)
- Bedrock: Costs for embeddings and queries
- Embedding: ~$0.0001 per 1000 tokens
- Queries: Varies by model
- CloudWatch: Log storage (30-day retention)
Estimated monthly cost for daily runs: $5-20 (depending on usage)
Solution: Add NVD API key to increase rate limits from 5 to 50 requests per 30 seconds.
sam deploy --parameter-overrides NVDApiKey="your-api-key-here"Solution: Increase Lambda timeout or reduce data fetch range.
# In template.yaml, increase timeout
Timeout: 900 # 15 minutesError: "ConflictException: Ingestion job already in progress"
Solution: Wait for the current ingestion job to complete, or check job status:
from bedrock_loader import BedrockKnowledgeBaseLoader
loader = BedrockKnowledgeBaseLoader(
knowledge_base_id='KB123EXAMPLE',
data_source_id='DS456EXAMPLE',
s3_bucket='my-bucket',
s3_prefix='vulnerability-data'
)
status = loader.get_ingestion_job_status('JOB_ID')
print(status)# Install dependencies
pip install -r requirements.txt
# Set environment variables
export KNOWLEDGE_BASE_ID="KB123EXAMPLE"
export DATA_SOURCE_ID="DS456EXAMPLE"
export S3_BUCKET="my-bucket"
export S3_PREFIX="vulnerability-data"
export NVD_API_KEY="your-api-key"
# Run locally
python -c "
from lambda_function import lambda_handler
result = lambda_handler({'scrape_cves': True, 'scrape_kevs': True, 'cve_limit': 10}, None)
print(result)
"# Test CVE scraper
from cve_scraper import CVEScraper
scraper = CVEScraper()
cves = scraper.fetch_cves(limit=5)
print(f"Fetched {len(cves)} CVEs")
# Test KEV scraper
from kev_scraper import KEVScraper
kev_scraper = KEVScraper()
kevs = kev_scraper.fetch_kevs()
print(f"Fetched {len(kevs)} KEVs")- Lambda function uses least-privilege IAM permissions
- S3 bucket has encryption enabled and public access blocked
- API keys stored as encrypted environment variables
- CloudWatch logs retained for 30 days
MIT License
Contributions are welcome! Please submit pull requests or open issues for bugs and feature requests.
For issues or questions:
- Check CloudWatch logs for error details
- Review the troubleshooting section
- Open an issue on GitHub